import luigi
import psycopg2

# gaussdb配置
class GaussDBTask:
    def __init__(self):
        self.host = "xxx.xxx.xxx.xxx"
        self.port = "xxxx"
        self.user = "xxxx"
        self.password = "xxxxxx"
        self.database = "xxxxxx"

# 建表
class CreateTableTask(luigi.Task):
    def connect(self):
        return psycopg2.connect(
            host=gaussDBTask.host,
            port=gaussDBTask.port,
            user=gaussDBTask.user,
            password=gaussDBTask.password,
            database=gaussDBTask.database
        )

    def output(self):
        return []

    def run(self):
        conn = self.connect()
        cursor = conn.cursor()
        print("成功连接到数据库")
        create_table_query = """
        DROP TABLE IF EXISTS xjw.student_tb;
        CREATE TABLE xjw.student_tb (
            id SERIAL PRIMARY KEY,
            name VARCHAR(255),
            age INTEGER
        );
        """
        cursor.execute(create_table_query)
        conn.commit()
        print("建表成功")
        cursor.close()
        conn.close()

# 插入数据
class InsertDataTask(CreateTableTask):
    def output(self):
        return []

    def run(self):
        conn = self.connect()
        cursor = conn.cursor()
        insert_query = """
        INSERT INTO xjw.student_tb (name, age) VALUES ('xiaowang', 20);
        INSERT INTO xjw.student_tb (name, age) VALUES ('xiaohong', 18);
        """
        cursor.execute(insert_query)
        conn.commit()
        print("插入成功")
        cursor.close()
        conn.close()

# 更新数据
class UpdateDataTask(CreateTableTask):
    def output(self):
        return []

    def run(self):
        conn = self.connect()
        cursor = conn.cursor()
        update_query = """
        UPDATE xjw.student_tb SET age = 18 WHERE name = 'xiaowang';
        """
        cursor.execute(update_query)
        conn.commit()
        print("更新成功")
        cursor.close()
        conn.close()

# 删除数据
class DeleteDataTask(CreateTableTask):
    def output(self):
        return []

    def run(self):
        conn = self.connect()
        cursor = conn.cursor()
        delete_query = """
        DELETE FROM xjw.student_tb WHERE name = 'xiaohong';
        """
        cursor.execute(delete_query)
        conn.commit()
        print("删除成功")
        cursor.close()
        conn.close()

# 查询数据
class QueryDataTask(CreateTableTask):
    def output(self):
        return []

    def run(self):
        conn = self.connect()
        cursor = conn.cursor()
        query = """
        SELECT * FROM xjw.student_tb;
        """
        cursor.execute(query)
        results = cursor.fetchall()
        for row in results:
            print(row)
        print("查询成功")
        cursor.close()
        conn.close()

if __name__ == '__main__':
    gaussDBTask = GaussDBTask()
    # luigi.build([CreateTableTask()], local_scheduler=True)
    luigi.build([CreateTableTask(), InsertDataTask(), UpdateDataTask(), DeleteDataTask(), QueryDataTask()], local_scheduler=True)
